Real-time data ingestion to Databricks Delta Lake with Redpanda (Kafka Connect)
Introduction
We are in the era where Real-time data ingestion has become one of the critical requirements for many organizations which are seeking to tap the value from their data in Real-time and near Real-time. Real-time data ingestion has grown to become one of the brands that sets apart many organizations in the competing market, but a research from Databricks revealed that, a staggering 73% of a company's data goes unused for analytics and decision-making when stored in a data lake. This means that machine learning models are doomed to return inaccurate results, perform poorly in real-world scenarios and many other implications.
Delta lake is a game changer for big data, developed as an advanced open source storage layer, it provides an abstract layer on top of existing data lakes and it is optimized for Parquet-based storage. Databricks Delta lake provides features such as:
- Support for ACID transactions - ACID stands for Atomicity, Consistency, Isolation, and Durability. All the transactions made on the delta lakes are ACID compliant.
- Schema enforcement - When writing data to storage, Delta lakes will enforce schema which helps in maintaining the columns and data types and achieving data reliability and high-quality data.
- Scalable metadata handling - Delta lake will scale out all metadata processing operations using compute engines like Apache Spark and Apache Hive, allowing it to process the metadata for petabytes of data efficiently.
Databricks Delta lake integration can be used for a variety of use cases to store, manage, and analyze volumes of incoming data. For example, In customer analytics, Delta lakes can be used to store and process customer data for analytics purposes, and also it can be used to process IoT data because of its ability to handle large volumes of data and even performing real-time data processing.
In this tutorial, you will learn how to do the following:
- Create and configure Databricks Delta lakes
- Set up and run a Redpanda cluster and create topics for Kafka Connect usage
- Configure and run a Kafka Connect cluster for Redpanda and Databricks Delta lake integration
Deep dive into Databricks deployment architecture
Databricks deployment is structured to provide a fully managed, scalable, reliable, and secure platform for data processing and analytics tasks. It's architecture is split into two main components: the control plane and data plane and this enables secure cross-functional team collaboration while keeping a significant amount of backend services managed by Databricks.
The control plane and data plane work together to provide a smooth experience for the data teams. The separation of the control plane and data plane makes it possible for Databricks to scale each component independently which ensures the deployment architecture remains scalable, reliable and performant even as workloads grow in size and complexity.
The overall Redpanda BYOC (bring your own cloud) deployment architecture follows the same structure as that of Databricks deployment architecture in the following ways:
- Both Redpanda and Databricks architectures leverage the underlying cloud infrastructure to provide reliability, security, and scalability for data processing and analytics tasks.
- Just like Databricks, Redpanda BYOC contains the control plane which is responsible for managing Redpanda clusters including provisioning and scaling, also the data plane which is responsible for processing and storing data.
Redpanda BYOC and Databricks both provide an interactive and collaborative workspace for data teams to operate in, facilitating faster iterations and the delivery of value.
Prerequisites
You'll need the following for this tutorial:
- Docker installed on your machine, preferably Docker Desktop if you’re on Windows/Mac (this article uses Docker 20.10.21)
- A machine to install Redpanda and Kafka Connect (this tutorial uses Linux, but you can run Redpanda on Docker, MacOS, and Windows, as well)
- Python 3.10 or higher.
- Java version 8 or later( this tutorial uses java 11)
- Delta Lake 1.2.1
All of the code resources for this tutorial can be found in this repository.
Use case: Implementing data ingestion to Databricks Delta lakes with Redpanda (Kafka Connect)
In this part we create a fictitious scenario to help you understand how you can ingest data to Databricks Delta Lake using Kafka Connect with Redpanda. This is just for demonstration purposes only.
Let us imagine that you work as a Lead engineer at a e-commerce company known as EasyShop,that sells products its website. Your company is experiencing a rapid growth and due to this it is expanding its operations to new markets.
To better understand customer behavior and preferences, the company wants to create a unified view of its sales and customer data which includes user clicks, page views, and orders. To achieve this, you decide to set up a data pipeline that ingests data in Real-time from the company website into Databricks Delta Lake.
The following diagram explains the high-level architecture:
Setting up Redpanda
In this article, we assume that you already have Redpanda installed via Docker. If you have note installed, please refer to this Documentation on how to installed Redpanda via Docker.
To check if Redpanda is up and running, execute the command docker ps
. You’ll see an output like this if the service is running:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
bb2305256285 vectorized/redpanda "/entrypoint.sh redp…" 16 minutes ago Up 16 minutes 8081/tcp, 9092/tcp, 9644/tcp, 0.0.0.0:8082->8082/tcp, :::8082->8082/tcp redpanda-1
You can verify that Redpanda is running via the rpk CLI on the Docker by executing this command docker exec -it redpanda-1 rpk cluster info
where redpanda-1
is the container_name.
The output of the command should look similar to the following:
CLUSTER
=======
redpanda.36dda01b-4e8a-4949-bb38-04f83a13b009
BROKERS
=======
ID HOST PORT
0* 0.0.0.0 9092
Creating a Redpanda Topic
To create a topic in Redpanda, you can use rpk, which is a CLI tool for connecting to and interacting with Redpanda brokers. Open a terminal and connect to Redpanda’s container session by executing the command docker exec -it redpanda-1 bash
. You should see your terminal session connected now to the redpanda-1
container:
redpanda@bb2305256285:/$
Next, execute the command rpk topic create customer-data
to create the topic customer-data
. You can check the existence of the topic created with the command rpk topic list
. This will list all the topics which are up and running and you should see an output like this:
NAME PARTITIONS REPLICAS
customer-data 1 1
OR you can verify the created topics by getting the cluster information using the command rpk cluster info
.
You will see the following output:
CLUSTER
=======
redpanda.36dda01b-4e8a-4949-bb38-04f83a13b009
BROKERS
=======
ID HOST PORT
0* 0.0.0.0 9092
TOPICS
======
NAME PARTITIONS REPLICAS
customer-data 1 1
Developing the producer code for a scenario
You have Redpanda services running and a topic available in the Redpanda container to receive events, move on to create an application that will publish messages to this topic.
This will be a two-step process:
- First, you will set up a virtual environment and install the necessary dependencies.
- Create a producer code to generate sample customer data and publish it to the created
customer-data
topic.
Setting up virtual environment
Before creating a producer and consumer code, you have to set up your Python virtual environment and install the dependencies.
To begin, create a project directory, Real-time data ingestion to Databricks Delta Lake with Redpanda (Kafka Connect)
, in your machine.
Create a subdirectory customer-data
in the project directory. The customer-data
directory will hold the Python application that will publish our data.
While inside the customer-data
subdirectory, run python3 -m venv venv
command to create a virtual environment setup for this demo project.
In the same subdirectory, create a requirements.txt
file and paste the content from the GitHub Demo repo.
To install the dependencies, run pip install -r requirements.txt
.
Producing data
Create a file called redpanda_producer.py
that has producer code that generates and inserts 10000 random JSON entries into the customer-data
topic.
Paste this piece of code into the file:
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
from time import sleep
from faker import Faker
import faker_commerce
from faker.providers import internet
import json
import os
from dotenv import load_dotenv
def main():
topic_name = os.environ.get('REDPANDA_TOPIC')
broker_ip = os.environ.get('REDPANDA_BROKER_IP')
types_of_categories = ['clothing', 'electronics', 'home & kitchen', 'beauty & personal care', 'toys & games']
fake = Faker()
fake.add_provider(faker_commerce.Provider)
fake.add_provider(internet)
try:
# Create Kafka topic
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
admin = KafkaAdminClient(bootstrap_servers=broker_ip)
admin.create_topics([topic])
except Exception:
print(f"Topic {topic_name} is already created")
producer = KafkaProducer(bootstrap_servers=[broker_ip],
value_serializer=lambda m: json.dumps(m).encode('ascii'))
for i in range(10000):
fake_date = fake.date_this_month(True)
product_id = fake.pyint(1, 10000)
categories = fake.word(ext_word_list=types_of_categories)
product_name = fake.product_name()
name = fake.name()
email_addr = fake.email(name)
str_date = str(fake_date)
units_sold = fake.pyint(1, 25)
unit_price = fake.pyint(10, 500)
country = fake.country()
producer.send(topic_name, {'date': str_date, 'product_name': product_name, 'category': categories,
'name': name, 'email': email_addr, 'units_sold': units_sold,
'unit_price': unit_price, 'country': country})
print("Inserted entry ", i, " to topic", topic_name)
sleep(1)
load_dotenv()
main()
To run the script, execute the command python redpanda_producer.py
. If it runs successfully, you should see the messages produced on the topics.
You can also check the output using Redpanda’s CLI tool, rpk
, by running the following command:
rpk topic consume customer-data --brokers <IP:PORT>
You should see the output below, which has the topic
, value
, timestamp
, partition
, and offset
fields.
Configuring Databricks Delta lake
In this section, you'll configure Delta Lake using Spark session. For this, we will use the configure_delta-lake.py
script from our GitHub Demo repo.
Running the configure_delta-lake
script will accomplish the following:
- Generate a Spark session
- Obtain the schema from the topic we are generating the data
- Create the Delta table
Let us look deep into the steps
Using Pyspark to Create a Spark Session
You can create a new Spark session with the correct packages and add the corresponding additional Delta Lake dependencies to interact with Delta Lake. You can do this using the following code:
import os
from dotenv import load_dotenv
from delta import *
import pyspark as pyspark
def get_spark_session():
load_dotenv()
broker_ip = os.environ.get('REDPANDA_BROKER_IP')
topic = os.environ.get('REDPANDA_TOPIC')
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()
return spark
You can find the packages used inside the .env
file.
REDPANDA_BROKER_IP=0.0.0.0:9092
REDPANDA_TOPIC=customer-data
PYSPARK_SUBMIT_ARGS='--packages io.delta:delta-core_2.12:1.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'
DELTA_LAKE_TABLE_DIR='/tmp/delta/customer-table'
DELTA_LAKE_CHECKOUT_DIR='/tmp/checkpoint/'
Once you've set up the Spark session, you'll use it to obtain the schema of the customer-data
topic. You can find this logic in the get_schema()
function inside the get_schema.py
file.
def get_schema(spark):
load_dotenv()
broker_ip = os.environ.get('REDPANDA_BROKER_IP')
topic = os.environ.get('REDPANDA_TOPIC')
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", broker_ip)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.withColumn("value", expr("string(value)"))
.filter(col("value").isNotNull())
.select("key", expr("struct(offset, value) r"))
.groupBy("key").agg(expr("max(r) r"))
.select("r.value"))
df_read = spark.read.json(df_json.rdd.map(lambda x: x.value), multiLine=True)
return df_read.schema.json()
With the Spark session running and the schema from the topic, you can go ahead to create the Delta table. The paths are configurable and can be modified as per your preference by editing the .env
file.
You can use the following code to create a Delta table:
def create_delta_tables(spark, table_df):
load_dotenv()
table_path = os.environ.get('DELTA_LAKE_TABLE_DIR')
table = (spark
.createDataFrame([], table_df.schema)
.write
.option("mergeSchema", "true")
.format("delta")
.mode("append")
.save(table_path))
You've successfully configured Databricks Delta lake and created a Delta table for the data to be ingested.
Setting up Kafka Connect
Kafka Connect is an integration tool released with the Apache KafkaⓇ project. It’s scalable and flexible, and it provides reliable data streaming between Apache Kafka and external systems. You can use it to integrate with any system, including databases, search indexes, and cloud storage providers. Redpanda is fully compatible with the Kafka API.
Kafka Connect uses source and sink connectors for integration. Source connectors stream data from an external system to Kafka, while sink connectors stream from Kafka to an external system.
To install Kafka connect, you have to download the Apache Kafka package. Navigate to the Apache downloads page for Kafka and click the suggested download link for the Kafka 3.1.0 binary package.
Run the following commands to create a folder called pandabooks_integration
in your home directory and extract the Kafka binaries file to this directory.
mkdir pandabooks_integration && \
mv ~/Downloads/kafka_2.13-3.1.0.tgz pandabooks_integration && \
cd pandabooks_integration && \
tar xzvf kafka_2.13-3.1.0.tgz
Configuring the Connect Cluster
Before running a Kafka connect cluster, you have to set up a configuration file in the properties format.
Navigate to the pandabooks_integration
and create a folder called configuration
. While inside the folder, create a file known as connect.properties
and add the following lines to it:
# Bootstrap servers
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and in Connect external systems
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# The JSON converter automatically serializes JSON data from Kafka
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# The internal converter used for offsets and configurations
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
# Write path for offset data
offset.storage.file.filename=/tmp/connect.offsets
# The Kafka group to use when storing offset data in Kafka. This should be unique for each Kafka Connect cluster
group.id=connect-cluster
# Default topic settings
offset.flush.interval.ms=10000
Creating Source and Sink Connectors
With the Kafka Connect cluster configured, you can now create source and sink connectors for data integration. In this example, we'll create a source connector to stream data from a Kafka topic to a file sink connector that writes data to a file system.
Creating a Source Connector
To create a source connector, you need to specify the connector class, the Kafka topic to consume from, and the output format. You can use the following configuration for the source connector:
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "source-connector",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": "/tmp/source-data.txt",
"topic": "customer-data"
}
}' \
http://localhost:8083/connectors
This command creates a source connector named source-connector
that reads data from the customer-data
topic and writes it to the /tmp/source-data.txt
file.
Creating a Sink Connector
Similarly, you can create a sink connector to write data from Kafka to a file system. Here's an example configuration for a sink connector:
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "sink-connector",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": "1",
"file": "/tmp/sink-data.txt",
"topics": "customer-data"
}
}' \
http://localhost:8083/connectors
This command creates a sink connector named sink-connector
that reads data from the customer-data
topic and writes it to the /tmp/sink-data.txt
file.
Starting and Monitoring Connectors
Once you've created the source and sink connectors, you can start them using the following commands:
curl -X POST http://localhost:8083/connectors/source-connector/tasks/0/resume
curl -X POST http://localhost:8083/connectors/sink-connector/tasks/0/resume
You can monitor the status of your connectors by making a GET request to their respective endpoints:
curl -X GET http://localhost:8083/connectors/source-connector/status
curl -X GET http://localhost:8083/connectors/sink-connector/status
This will provide you with information about the status, tasks, and configurations of your connectors.
Conclusion
In this article, we've covered how to set up Databricks Delta Lake for streaming data ingestion and how to configure Kafka Connect for data integration. By following these steps, you can efficiently stream data from Apache Kafka to Delta Lake and perform real-time analytics on your data.
We've also demonstrated how to create source and sink connectors in Kafka Connect to stream data between Kafka and external systems. With these tools, you can build robust data pipelines for your streaming applications.
Thank you for reading!
Leave a comments